/*
Copyright 2020 The Magma Authors.

This source code is licensed under the BSD-style license found in the
LICENSE file in the root directory of this source tree.

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package servicers

import (
	"fmt"

	"magma/orc8r/cloud/go/identity"
	"magma/orc8r/cloud/go/services/streamer"
	"magma/orc8r/cloud/go/services/streamer/providers"
	"magma/orc8r/lib/go/protos"

	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

var (
	stringifiedEAGAIN = fmt.Sprintf("%s", streamer.EAGAIN)
)

type streamerServicer struct{}

func NewStreamerServicer() protos.StreamerServer {
	return &streamerServicer{}
}

// GetUpdates populates GW HwId in the request from GRPC metadata, finds a stream provider for stream name in
// the request and starts streaming updates generated by the provider. See GetUpdatesUnverified() for more details
func (srv *streamerServicer) GetUpdates(request *protos.StreamRequest, stream protos.Streamer_GetUpdatesServer) error {
	if request == nil {
		return status.Error(codes.InvalidArgument, "nil request")
	}
	// Check if we can get a valid Gateway identity
	gwIdentity, err := identity.GetStreamGatewayId(stream)
	if err != nil {
		return err
	}
	if gwIdentity.HardwareId == "" {
		return status.Errorf(codes.FailedPrecondition, "Gateway ID is empty")
	}
	// Overwrite/set Gw Id using verified identity from Certifier.
	// Older Gateways will populate their own Hw Id while the newer
	// Gateways may avoid doing so. We should be working with verified
	// identities in both cases or reject the request if there is none.
	request.GatewayId = gwIdentity.HardwareId
	return GetUpdatesUnverified(request, stream)
}

// GetUpdatesUnverified finds a stream provider for stream name in the request and starts streaming updates
// generated by the provider
// If provider's GetUpdates() returns error == nil, the update batch is sent to the client and the stream is closed
// If provider's GetUpdates() returns error == EAGAIN, the update batch is sent to the client and the streaming
// continues on the same stream
func GetUpdatesUnverified(request *protos.StreamRequest, stream protos.Streamer_GetUpdatesServer) error {
	provider, err := providers.GetStreamProvider(request.GetStreamName())
	if err != nil {
		return status.Errorf(codes.Unavailable, "stream %s does not exist", request.GetStreamName())
	}
	var updates []*protos.DataUpdate
	for err = streamer.EAGAIN; err == streamer.EAGAIN; {
		updates, err = provider.GetUpdates(request.GetGatewayId(), request.ExtraArgs)
		err = normalizeError(err)
		if err != nil && err != streamer.EAGAIN {
			return status.Errorf(codes.Aborted, "error while streaming updates: %s", err)
		}
		batch := &protos.DataUpdateBatch{Updates: updates, Resync: true}

		sendErr := stream.Send(batch)
		if sendErr != nil {
			return status.Errorf(codes.Internal, "error sending update batch %+v: %v", batch, sendErr)
		}
	}
	return nil
}

// normalizeError determines whether the gRPC-returned error status is
// equivalent to streamer.EAGAIN.
func normalizeError(err error) error {
	st, _ := status.FromError(err)
	if st.Message() == stringifiedEAGAIN {
		return streamer.EAGAIN
	}
	return err
}
